log4j.logger.org.apache.spark.deploy.ExternalShuffleService=INFO
ExternalShuffleService
ExternalShuffleService is an external shuffle service that serves shuffle blocks from outside an Executor process. It runs as a standalone application and manages shuffle output files so they are available for executors at all time. As the shuffle output files are managed externally to the executors it offers an uninterrupted access to the shuffle output files regardless of executors being killed or down.
You start ExternalShuffleService using start-shuffle-service.sh shell script and enable its use by the driver and executors using spark.shuffle.service.enabled.
|
Note
|
There is a custom external shuffle service for Spark on YARN — YarnShuffleService. |
|
Tip
|
Enable Add the following line to Refer to Logging. |
start-shuffle-service.sh Shell Script
start-shuffle-service.sh
start-shuffle-service.sh shell script allows you to launch ExternalShuffleService. The script is under sbin directory.
When executed, it runs sbin/spark-config.sh and bin/load-spark-env.sh shell scripts. It then executes sbin/spark-daemon.sh with start command and the parameters: org.apache.spark.deploy.ExternalShuffleService and 1.
$ ./sbin/start-shuffle-service.sh starting org.apache.spark.deploy.ExternalShuffleService, logging to ...logs/spark-jacek-org.apache.spark.deploy.ExternalShuffleService-1-japila.local.out $ tail -f ...logs/spark-jacek-org.apache.spark.deploy.ExternalShuffleService-1-japila.local.out Spark Command: /Library/Java/JavaVirtualMachines/Current/Contents/Home/bin/java -cp /Users/jacek/dev/oss/spark/conf/:/Users/jacek/dev/oss/spark/assembly/target/scala-2.11/jars/* -Xmx1g org.apache.spark.deploy.ExternalShuffleService ======================================== Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 16/06/07 08:02:02 INFO ExternalShuffleService: Started daemon with process name: [email protected] 16/06/07 08:02:03 INFO ExternalShuffleService: Starting shuffle service on port 7337 with useSasl = false
|
Tip
|
You can also use spark-class to launch
|
Launching ExternalShuffleService — main Method
When started, it executes Utils.initDaemon(log).
|
Caution
|
FIXME Utils.initDaemon(log)? See spark-submit.
|
It loads default Spark properties and creates a SecurityManager.
It sets spark.shuffle.service.enabled to true (as later it is checked whether it is enabled or not).
A shutdown hook is registered so when ExternalShuffleService is shut down, it prints the following INFO message to the logs and the stop method is executed.
INFO ExternalShuffleService: Shutting down shuffle service.
|
Tip
|
Enable Add the following line to
Refer to Logging. |
You should see the following INFO message in the logs:
INFO ExternalShuffleBlockResolver: Registered executor [AppExecId] with [executorInfo]
You should also see the following messages when a SparkContext is closed:
INFO ExternalShuffleBlockResolver: Application [appId] removed, cleanupLocalDirs = [cleanupLocalDirs]
INFO ExternalShuffleBlockResolver: Cleaning up executor [AppExecId]'s [executor.localDirs.length] local dirs
DEBUG ExternalShuffleBlockResolver: Successfully cleaned up directory: [localDir]
Creating ExternalShuffleService Instance
ExternalShuffleService requires a SparkConf and SecurityManager.
When created, it reads spark.shuffle.service.enabled (disabled by default) and spark.shuffle.service.port (defaults to 7337) configuration settings. It also checks whether authentication is enabled.
|
Caution
|
FIXME Review securityManager.isAuthenticationEnabled()
|
It then creates a TransportConf (as transportConf).
It creates a ExternalShuffleBlockHandler (as blockHandler) and TransportContext (as transportContext).
|
Caution
|
FIXME TransportContext? |
No internal TransportServer (as server) is created.
Starting ExternalShuffleService — start Method
start(): Unit
start starts a ExternalShuffleService.
When start is executed, you should see the following INFO message in the logs:
INFO ExternalShuffleService: Starting shuffle service on port [port] with useSasl = [useSasl]
If useSasl is enabled, a SaslServerBootstrap is created.
|
Caution
|
FIXME SaslServerBootstrap? |
The internal server reference (a TransportServer) is created (which will attempt to bind to port).
Stopping ExternalShuffleService — stop Method
stop(): Unit
stop closes the internal server reference and clears it (i.e. sets it to null).
ExternalShuffleBlockHandler
ExternalShuffleBlockHandler is a RpcHandler (i.e. a handler for sendRPC() messages sent by TransportClients).
When created, ExternalShuffleBlockHandler requires a OneForOneStreamManager and TransportConf with a registeredExecutorFile to create a ExternalShuffleBlockResolver.
|
Tip
|
Enable Add the following line to
Refer to Logging. |
handleMessage Method
handleMessage(
BlockTransferMessage msgObj,
TransportClient client,
RpcResponseCallback callback)
handleMessage handles two types of BlockTransferMessage messages:
For any other BlockTransferMessage message it throws a UnsupportedOperationException:
Unexpected message: [msgObj]
OpenBlocks
OpenBlocks(String appId, String execId, String[] blockIds)
When OpenBlocks is received, handleMessage authorizes the client.
|
Caution
|
FIXME checkAuth?
|
It then gets block data for each block id in blockIds (using ExternalShuffleBlockResolver).
Finally, it registers a stream and does callback.onSuccess with a serialized byte buffer (for the streamId and the number of blocks in msg).
|
Caution
|
FIXME callback.onSuccess?
|
You should see the following TRACE message in the logs:
TRACE Registered streamId [streamId] with [length] buffers for client [clientId] from host [remoteAddress]
ExternalShuffleBlockResolver
|
Caution
|
FIXME |
getBlockData Method
ManagedBuffer getBlockData(String appId, String execId, String blockId)
getBlockData parses blockId (in the format of shuffle_[shuffleId]_[mapId]_[reduceId]) and returns the FileSegmentManagedBuffer that corresponds to shuffle_[shuffleId]_[mapId]_0.data.
getBlockData splits blockId to 4 parts using _ (underscore). It works exclusively with shuffle block ids with the other three parts being shuffleId, mapId, and reduceId.
It looks up an executor (i.e. a ExecutorShuffleInfo in executors private registry) for appId and execId to search for a ManagedBuffer.
The ManagedBuffer is indexed using a binary file shuffle_[shuffleId]_[mapId]_0.index (that contains offset and length of the buffer) with a data file being shuffle_[shuffleId]_[mapId]_0.data (that is returned as FileSegmentManagedBuffer).
It throws a IllegalArgumentException for block ids with less than four parts:
Unexpected block id format: [blockId]
or for non-shuffle block ids:
Expected shuffle block id, got: [blockId]
It throws a RuntimeException when no ExecutorShuffleInfo could be found.
Executor is not registered (appId=[appId], execId=[execId])"
Settings
| Spark Property | Default Value | Description |
|---|---|---|
|
Enables External Shuffle Service. When Used to enable for dynamic allocation of executors and in CoarseMesosSchedulerBackend to instantiate It is explicitly disabled for |
|
|